package com.RateLimiter;

import com.google.common.util.concurrent.Monitor;
import com.google.common.util.concurrent.RateLimiter;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.IntStream;

import static java.lang.Thread.currentThread;

/**
 * Created by Lxk on 2019/5/3.
 */
public class Bucket {

    private final ConcurrentLinkedQueue<Integer> container = new ConcurrentLinkedQueue<>();

    private static final int BUCKET_LIMIT = 1000;

    private final RateLimiter limiter = RateLimiter.create(10);

    private final Monitor offerMonitor = new Monitor();

    private final Monitor pollMonitor = new Monitor();

    public void submit(Integer data){
        if(offerMonitor.enterIf(offerMonitor.newGuard(()->container.size()<BUCKET_LIMIT))){
            try{
                System.out.println(currentThread() + " submit " + data);
                container.offer(data);
            }catch (Exception e){
                e.printStackTrace();
            }finally {
                offerMonitor.leave();
            }
        }else{
            throw new IllegalStateException("The bucket is full");
        }
    }

    public void takeThenConsumer(Consumer<Integer> consumer){
        if(pollMonitor.enterIf(pollMonitor.newGuard(()->!container.isEmpty()))){
            try{
                System.out.println(currentThread() + "waiting " + limiter.acquire());
                consumer.accept(container.poll());
            }catch (Exception e){
                e.printStackTrace();
            }finally{
                pollMonitor.leave();
            }
        }
    }




    public static void main(String[] args) {
        final Bucket bucket = new Bucket();
        final AtomicInteger DATA_CREATOR = new AtomicInteger(0);

        IntStream.range(0,5).forEach(i->{
            new Thread(()->
            {
                for(; ; ){
                    int data = DATA_CREATOR.getAndIncrement();
                    bucket.submit(data);
                    try{
                        TimeUnit.MILLISECONDS.sleep(200);
                    }catch (Exception e){
                        if(e instanceof IllegalStateException){
                            System.out.println(e.getMessage());
                        }
                    }
                }
            }).start();
        });

        IntStream.range(0,5).forEach(i->{
            new Thread(()->
            {
                for(; ; ){
                    bucket.takeThenConsumer(consumer-> System.out.println(currentThread() + " handle " + consumer));
                }
            }).start();
        });
    }

}
